package com.whut.dao.impl;

import com.whut.dao.IHBaseRestDao;
import com.whut.domain.HbaseCell;
import com.whut.domain.HbaseRow;
import com.whut.domain.HbaseRows;
import com.whut.util.Base64Util;
import com.whut.util.HttpRequestUtil;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
 * Created by YY on 2017-12-29.
 */
@Component
public class HBaseRestDaoImpl implements IHBaseRestDao {
    
    private  static String uriBase = "http://59.69.101.206:6077/";

    /**
     * 获取所有表名
     *
     * @return 表名
     */
    @Override
    public  JSONArray getHTableArray() {
        Map<Integer, String> map = HttpRequestUtil.httpGet(uriBase);
        JSONArray jsonArray = null;
        if (map.containsKey(200)) {
            try {
                jsonArray = new JSONObject(map.get(200)).getJSONArray("table");
            } catch (JSONException e) {
                e.printStackTrace();
            }
        }
        return jsonArray;
    }

    /**
     * 列出表的域(regions)
     *
     * @param tableName 输入表名
     * @return 结构
     */
    @Override
    public  JSONObject getHTableRegions(String tableName) {
        Map<Integer, String> map = HttpRequestUtil.httpGet(uriBase.concat(tableName).concat("/regions"));
        JSONObject jsonObject = null;
        if (map.containsKey(200)) {
            try {
                jsonObject = new JSONObject(map.get(200));
            } catch (JSONException e) {
                e.printStackTrace();
            }
        }
        return jsonObject;
    }

    /**
     * 获取某个表的结构
     *
     * @param tableName 输入表名
     * @return 结构
     */
    @Override
    public  JSONObject getHTableSchema(String tableName) {
        Map<Integer, String> map = HttpRequestUtil.httpGet(uriBase.concat(tableName).concat("/schema"));
        JSONObject object = null;
        if (map.containsKey(200)) {
            try {
                object = new JSONObject(map.get(200));
            } catch (JSONException e) {
                e.printStackTrace();
            }
        }
        return object;
    }

    /**
     * 通过JSON格式创建一张表
     *
     * @param tableName 输入表名
     * @param strJson   JSON对象
     * @return 创建结果
     */
    @Override
    public  Map<Boolean, String> createHTableByJson(String tableName, JSONObject strJson) {
        Map<Boolean,String> result=new TreeMap<>();
        String strUrl = uriBase.concat(tableName).concat("/schema");
        Map<Integer, String> map = HttpRequestUtil.httpPutByJson(strUrl, strJson);
        String str="";
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    /**
     * 通过JSON格式更新表结构
     *
     * @param tableName 输入表名
     * @param strJson   JSON对象
     * @return 更新结果
     */
    @Override
    public  Map<Boolean, String> updateHTableByJson(String tableName, JSONObject strJson) {
        Map<Boolean,String> result=new TreeMap<>();
        String strUrl = uriBase.concat(tableName).concat("/schema");
        Map<Integer, String> map = HttpRequestUtil.httpPostByJson(strUrl, strJson);
        String str="";
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    /**
     * 通过表名删除一张表
     *
     * @param tableName 输入表名
     * @return 删除结果
     */
    @Override
    public  Map<Boolean, String> deleteHTable(String tableName) {
        String strUrl = uriBase.concat(tableName).concat("/schema");
        Map<Integer, String> map = HttpRequestUtil.httpDelete(strUrl);
        String str="";
        Map<Boolean,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    /**
     * 获得一行
     *
     * @param tableName 输入表名
     * @param rowkey    行键
     * @return 指定行的Json对象
     */
    @Override
    public  Map<Boolean, String> findRow(String tableName, String rowkey) {

        String strUrl = uriBase.concat(tableName).concat("/").concat(rowkey);
        Map<Integer, String> map = HttpRequestUtil.httpGet(strUrl);
        String str="";
        Map<Boolean,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    /**
     * 获得多行
     *
     * @param tableName    输入表名
     * @param prefixFilter 行键的前缀
     * @param maxCount     最大的cell数量，注意不是行的数量
     * @return 查找到的Json对象
     */
    @Override
    public  Map<Boolean, String> findRowsByprefixFilter(String tableName, String prefixFilter, int maxCount) {
        String strUrl = uriBase.concat(tableName).concat("/").concat("scanner/");
        String scannerUrl = "";
        String xmlStr = "<Scanner batch=" + '"' + maxCount + '"' + ">";
        xmlStr += "<filter>{";
        xmlStr += ('"' + "type" + '"' + ":" + '"' + "PrefixFilter" + '"' + ',');
        xmlStr += ('"' + "value" + '"' + ":" + '"' + prefixFilter + '"' + '}');
        xmlStr += "</filter></Scanner>";
        System.out.println(xmlStr);
        Map<Integer, String> map = HttpRequestUtil.httpPutReHeaderByXml(strUrl, xmlStr, "Location");
        Map<Integer,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            scannerUrl=map.get(code);
            result = HttpRequestUtil.httpGet(scannerUrl);
            HttpRequestUtil.httpDelete(scannerUrl);
        }
        String str="";
        Map<Boolean,String> result1=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (result.containsKey(200)||result.containsKey(201))
        {
            result1.put(true,str);
        }
        else
        {
            result1.put(false,str);
        }
        return result1;
    }

    /**
     * 获得多行
     *
     * @param tableName    输入表名
     * @param suffixFilter 行键的后缀
     * @param maxCount     最大的cell数量，注意不是行的数量
     * @return 查找到的Json对象
     */
    @Override
    public  Map<Boolean, String> findRowsBySuffixFilter(String tableName, String suffixFilter, int maxCount) {
        String strUrl = uriBase.concat(tableName).concat("/").concat("scanner/");
        String scannerUrl = "";
        String xmlStr = "<Scanner batch=" + '"' + maxCount + '"' + ">";
        xmlStr += "<filter>{";
        xmlStr += ('"' + "type" + '"' + ":" + '"' + "SuffixFilter" + '"' + ',');
        xmlStr += ('"' + "value" + '"' + ":" + '"' + suffixFilter + '"' + '}');
        xmlStr += "</filter></Scanner>";
        System.out.println(xmlStr);
        Map<Integer, String> map = HttpRequestUtil.httpPutReHeaderByXml(strUrl, xmlStr, "Location");
        Map<Integer,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            scannerUrl=map.get(code);
            result = HttpRequestUtil.httpGet(scannerUrl);
            HttpRequestUtil.httpDelete(scannerUrl);
        }
        String str="";
        Map<Boolean,String> result1=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (result.containsKey(200)||result.containsKey(201))
        {
            result1.put(true,str);
        }
        else
        {
            result1.put(false,str);
        }
        return result1;
    }

    @Override
    public Map<Boolean, String> findRowsByRangFilter(String tableName, String startRow, String endRow) {

        String strUrl = uriBase.concat(tableName).concat("/").concat("scanner/");
        String scannerUrl = "";
        String xmlStr = "<Scanner";
        xmlStr += (" startRow="+ '"'+Base64Util.encodeByBase64(startRow)+'"');
        xmlStr += (" endRow="+ '"'+Base64Util.encodeByBase64(endRow)+'"');
        xmlStr += "></Scanner>";
        System.out.println(xmlStr);
        Map<Integer, String> map = HttpRequestUtil.httpPutReHeaderByXml(strUrl, xmlStr, "Location");
        Map<Integer,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            scannerUrl=map.get(code);
            result = HttpRequestUtil.httpGet(scannerUrl);
            HttpRequestUtil.httpDelete(scannerUrl);
        }
        String str="";
        Map<Boolean,String> result1=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (result.containsKey(200)||result.containsKey(201))
        {
            result1.put(true,str);
        }
        else
        {
            result1.put(false,str);
        }
        return result1;
    }

    /**
     * 获得一列
     *
     * @param tableName    输入表名
     * @param rowkey       行键名
     * @param columnFamily 列族名
     * @param column       列名
     * @return 指定列的Json对象
     */
    @Override
    public  Map<Boolean, String> findCell(String tableName, String rowkey, String columnFamily, String column) {

        String strUrl = uriBase.concat(tableName).concat("/").concat(rowkey).concat("/").
                concat(columnFamily).concat(":").concat(column);
        Map<Integer, String> map = HttpRequestUtil.httpGet(strUrl);
        String str="";
        Map<Boolean,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    /**
     * 获得一列的前n个版本
     *
     * @param tableName    输入表名
     * @param rowkey       行键名
     * @param columnFamily 列族名
     * @param column       列名
     * @param n            前n个版本
     * @return 指定列的Json对象
     */
    @Override
    public  Map<Boolean, String> findMultilVersionCell(String tableName, String rowkey, String columnFamily, String column, int n) {

        String strUrl = uriBase.concat(tableName).concat("/").concat(rowkey).concat("/").
                concat(columnFamily).concat(":").concat(column).concat("?v=") + n;
        Map<Integer, String> map = HttpRequestUtil.httpGet(strUrl);
        String str="";
        Map<Boolean,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    /**
     * 插入一个单元cell
     *
     * @param tableName    输入表名
     * @param rowkey       行键
     * @param columnfamily 列族
     * @param column       列
     * @param value        值
     * @return 插入结果
     */
    @Override
    public  Map<Boolean, String> insertCell(String tableName, String rowkey, String columnfamily, String column, String value) {
        String strUrl = uriBase.concat(tableName).concat("/").concat(rowkey);
        HbaseCell hbaseCell = new HbaseCell(columnfamily, column, value);
        HbaseRow hbaseRow = new HbaseRow(rowkey, hbaseCell);
        HbaseRows hbaseRows = new HbaseRows(hbaseRow);
        return insertRow("test", rowkey, hbaseRows.getHbaseRowsJson());
    }

    /**
     * 插入一行
     *
     * @param tableName  输入表名
     * @param rowkey     行键
     * @param jsonObject 插入的值
     * @return 插入结果
     */
    @Override
    public  Map<Boolean, String> insertRow(String tableName, String rowkey, JSONObject jsonObject) {
        String strUrl = uriBase.concat(tableName).concat("/").concat(rowkey);
        Map<Integer, String> map = HttpRequestUtil.httpPutByJson(strUrl, jsonObject);
        String str="";
        Map<Boolean,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }
    /**
     * 插入多行
     *
     * @param tableName  输入表名
     * @param jsonObject 插入的值
     * @return 插入结果
     */
    @Override
    public  Map<Boolean, String> insertRows(String tableName, JSONObject jsonObject) {
        String strUrl = uriBase.concat(tableName).concat("/").concat("rows");
        Map<Integer, String> map = HttpRequestUtil.httpPutByJson(strUrl, jsonObject);
        String str="";
        Map<Boolean,String> result=new TreeMap<>();
        for(Integer code:map.keySet())
        {
            str=map.get(code);
        }
        if (map.containsKey(200)||map.containsKey(201))
        {
            result.put(true,str);
        }
        else
        {
            result.put(false,str);
        }
        return result;
    }

    @Override
    public Map<Boolean,Map<String, String>> findFixColumnsBetweenRow(String tableName, String startRow, String endRow,String columnFamily,String column) {

        Map<String, String> outputRowkeyAndValue=new TreeMap<>();
        Map<Boolean,String> map = findRowsByRangFilter(tableName,startRow,endRow);
        Boolean result=false;
        for(Boolean key:map.keySet())
            result=key;
        if(result)
        {
            List<HbaseRow> hbaseRowList = HbaseRows.parseHBaseRowsJson(map.get(result));
            for(HbaseRow hbaseRow:hbaseRowList)
            {
                List<HbaseCell> hbaseCellList=hbaseRow.getHbaseCellList();
                for(HbaseCell hbaseCell:hbaseCellList)
                {
                    if(hbaseCell.columnFamily==columnFamily&&hbaseCell.column==column)
                    {
                        outputRowkeyAndValue.put(hbaseRow.getRowKey(),hbaseCell.value);
                    }
                }
            }
        }
        Map<Boolean,Map<String, String>> result1=new TreeMap<>();
        result1.put(result,outputRowkeyAndValue);
        return result1;
    }

    //兼容原版的Hbase操作
    //插入单个值
    @Override
    public Boolean add(String tableName, String rowKey, String family, String qualifier, String value) {
        Boolean result=false;
        Map<Boolean,String> map= insertCell(tableName,rowKey,family,qualifier,value);
        for(Boolean key:map.keySet())
            result=key;
        return result;
    }

    //插入多个值
    @Override
    public String find(String tableName, String rowKey, String family, String qualifier) {
        Boolean result=false;
        Map<Boolean,String> map= findCell(tableName,rowKey,family,qualifier);
        for(Boolean key:map.keySet())
            result=key;
        return map.get(result);
    }

    //插入一行，多列
    @Override
    public Boolean adds(String tableName, String rowKey, String family, Map<String, String> quaAndValue) {

        HbaseRow hbaseRow=new HbaseRow(rowKey);
        for(String column:quaAndValue.keySet())
        {
            hbaseRow.addHbaseCell(new HbaseCell(family,column,quaAndValue.get(column)));
        }
        HbaseRows hbaseRows=new HbaseRows(hbaseRow);
        Boolean result=false;
        Map<Boolean,String> map= insertRow(tableName,rowKey,hbaseRows.getHbaseRowsJson());
        for(Boolean key:map.keySet())
            result=key;
        return result;
    }

    //插入多行一列
    @Override
    public Boolean adds(String tableName, Map<String, String> rowkeyAndValue, String family, String qualifier) {
        HbaseRows hbaseRows=new HbaseRows();
        for(String rowkey:rowkeyAndValue.keySet())
        {
            hbaseRows.addHbaseRow(new HbaseRow(rowkey,new HbaseCell(family,qualifier,rowkeyAndValue.get(rowkey))));
        }
        Boolean result=false;
        Map<Boolean,String> map= insertRows(tableName,hbaseRows.getHbaseRowsJson());
        for(Boolean key:map.keySet())
            result=key;
        return result;
    }

    @Override
    public Boolean finds(String tableName, String startRow, String stopRow, Map<String, String> outputRowkeyAndValue) {

        String strUrl = uriBase.concat(tableName).concat("/").concat("scanner/");
        String scannerUrl = "";
        String xmlStr = "<Scanner";
        xmlStr += (" startRow="+ '"'+Base64Util.encodeByBase64(startRow)+'"');
        xmlStr += (" endRow="+ '"'+Base64Util.encodeByBase64(stopRow)+'"');
        xmlStr += "></Scanner>";
        System.out.println(xmlStr);
        Map<Integer, String> map = HttpRequestUtil.httpPutReHeaderByXml(strUrl, xmlStr, "Location");
        Map<Integer,String> result=new TreeMap<>();
        String str="";
        for(Integer code : map.keySet())
            str=map.get(code);
        if (map.containsKey(200)||map.containsKey(201)) {
            scannerUrl = str;
            result = HttpRequestUtil.httpGet(scannerUrl);
            HttpRequestUtil.httpDelete(scannerUrl);
        }
        for(Integer code : result.keySet())
            str=map.get(code);
        if (result.containsKey(200)||result.containsKey(201))
        {
            List<HbaseRow> hbaseRowList = HbaseRows.parseHBaseRowsJson(str);
            for(HbaseRow hbaseRow:hbaseRowList)
            {
                List<HbaseCell> hbaseCellList=hbaseRow.getHbaseCellList();
                if(!hbaseCellList.isEmpty())
                {
                    outputRowkeyAndValue.put(hbaseRow.getRowKey(),hbaseCellList.get(0).value);
                }
            }
            return true;
        }
        return false;
    }

}

